feat: Add experimental support for accelerated PyArrow UDFs#4234
feat: Add experimental support for accelerated PyArrow UDFs#4234andygrove wants to merge 49 commits into
Conversation
When Comet operators produce Arrow columnar data and the next operator is a Python UDF (mapInArrow/mapInPandas), Spark currently inserts an unnecessary ColumnarToRow transition. The Python runner then converts those rows back to Arrow to send to Python, creating a wasteful Arrow->Row->Arrow round-trip. This adds CometPythonMapInArrowExec which: - Accepts columnar input directly from Comet operators - Uses lightweight batch.rowIterator() instead of UnsafeProjection - Keeps the Python output as ColumnarBatch (no output row conversion) The optimization is detected in EliminateRedundantTransitions and controlled by spark.comet.exec.pythonMapInArrow.enabled (default: true).
Documents the CometPythonMapInArrowExec optimization, including supported APIs, configuration, usage example, and how to verify the optimization is active in query plans.
…ions Fix three issues that prevented test_pyarrow_udf.py from running: 1. mapInArrow callbacks must accept Iterator[pa.RecordBatch] and yield batches. The previous single-batch signatures crashed with "'map' object has no attribute 'to_pandas'". 2. PySpark DataFrame has no `queryExecution` attribute. Use `_jdf.queryExecution().executedPlan().toString()` instead. 3. Replace soft plan-string heuristics with assertions that fail loudly if the optimization regresses. Match on `CometPythonMapInArrow` (no `Exec` suffix in the plan toString) and assert no `ColumnarToRow` transition is present.
- Rewrite test_pyarrow_udf.py as a pytest module. A session-scoped SparkSession fixture builds the Comet-enabled session once and a parametrized `accelerated` fixture toggles spark.comet.exec.pythonMapInArrow.enabled per test, so each case runs under both the optimized and fallback paths and asserts the expected plan operator (`CometPythonMapInArrow` vs vanilla `PythonMapInArrow`). The jar is auto-discovered from spark/target by matching the installed pyspark version, or taken from the COMET_JAR env var. - Add a dedicated `PyArrow UDF Tests` workflow that builds Comet against Spark 3.5 / Scala 2.12, installs pyspark/pyarrow/pandas/pytest, and runs the new pytest module. - Add CometPythonMapInArrowSuite to the `exec` suite list in both pr_build_linux.yml and pr_build_macos.yml so the JVM-side suite is exercised on every PR.
Replace the narrow paths allowlist with the same paths-ignore list used by pr_build_linux.yml so the workflow runs on any source change that could affect Comet's PyArrow UDF execution path, not just the few files explicitly named.
The PR's `CometPythonMapInArrowExec` and `EliminateRedundantTransitions` rule directly reference Spark 3.5 APIs that differ across supported Spark versions: the `ArrowPythonRunner` constructor (4 distinct signatures across 3.4/3.5/4.0/4.1+/4.2), `arrowUseLargeVarTypes`, `JobArtifactSet`, `MapInBatchExec.isBarrier`, and the `PythonMapInArrowExec` type itself (renamed to `MapInArrowExec` in 4.0+). This breaks compile on every profile other than 3.5. Introduce a per-version `ShimCometPythonMapInArrow` trait under `org.apache.spark.sql.comet.shims` (placed in the spark namespace so it can reach `private[spark]` members) that: * matches the Spark-version-specific MapInArrow / MapInPandas exec types and exposes their `(func, output, child, isBarrier, evalType)` tuple, * constructs the right `ArrowPythonRunner` for the version, * hides `arrowUseLargeVarTypes` / `JobArtifactSet` / `getPythonRunnerConfMap` behind helper methods. Spark 3.4 lacks the prerequisite APIs (no `isBarrier`, no `JobArtifactSet`, no `arrowUseLargeVarTypes`), so its shim returns `None` from the matchers and the optimization is a no-op there.
The default `amd64/rust` image is Debian 13 (trixie), where the system `python3` is 3.13 and there is no `python3.11` apt package. The workflow installed `python3.11` explicitly, which fails on trixie with `Unable to locate package python3.11`. Switching to `rust:bookworm` gives a Debian 12 base where `python3` is 3.11, matching the job name and pyspark 3.5.x's supported runtime.
Spark launches Python workers in fresh subprocesses that look up python3 on PATH. Without PYSPARK_PYTHON, workers use the system python (no pyarrow installed) and UDF execution fails with ModuleNotFoundError. Point both PYSPARK_PYTHON and PYSPARK_DRIVER_PYTHON at /tmp/venv/bin/python so workers inherit the same interpreter that pytest uses.
Flip spark.comet.exec.pythonMapInArrow.enabled default from true to false and prefix the config doc with "Experimental:" so the default matches the "[experimental]" label on the feature. Update the user guide to instruct users to opt in explicitly.
Add coverage for cases that the original pytest module did not exercise: - mapInPandas (claimed supported, previously zero coverage) - Null preservation across long and string columns via Arrow passthrough - Empty input from a CometScan via filter pushdown - Python exception propagation (sentinel must surface in driver-side error) - DecimalType(18,6), DateType, TimestampType round-trip with nulls - ArrayType<Int> and nested StructType, including null arrays/structs and arrays containing null elements - repartition between scan and UDF (correctness only; the optimization itself does not fire across a vanilla Exchange and is documented as such in the test) Generalize _assert_plan_matches_mode to take the vanilla node name so the fallback assertion can match either PythonMapInArrow or MapInPandas.
Expand the user guide with the limitations a user should know before enabling the experimental optimization: - The remaining row-to-Arrow round-trip inside the Python runner is documented more precisely (the input goes through ColumnarBatch.rowIterator to feed ArrowPythonRunner, which re-encodes to Arrow IPC). - A vanilla Spark Exchange between the Comet scan and the UDF prevents the optimization from firing. Users must configure Comet's native shuffle manager at session startup to keep the data columnar. - Spark 3.4 lacks the prerequisite APIs and the feature is a no-op there. - isBarrier is captured by the operator constructor but not yet propagated to the Python runner. Also explain the AQE display quirk: with AQE on and a shuffle present, the pre-execution plan shows the unoptimized form because the rule only sees the materialized subplan after stage execution. Running an action and re-inspecting explain() reveals the optimized plan.
Standalone Python script that times df.mapInArrow(passthrough).count() and the equivalent mapInPandas query with the optimization toggled on and off. Numbers are wall-clock seconds, so they include the Python worker, Arrow IPC, and downstream count() costs. That is the right unit for a feature whose user surface is Python: it shows what fraction of end-to-end time the optimization shaves off, not just the JVM-side delta in isolation. Three workloads exercise the dimension where the optimization helps most: - narrow primitives (long, int, double) - mixed with strings (variable-length encoding) - wide rows (50 columns, projection cost scales with column count) Local smoke run with 200k rows shows 1.17x to 1.45x speedup across mapInArrow and mapInPandas, narrow/wide schemas. The script is configurable via BENCHMARK_ROWS / BENCHMARK_WARMUP / BENCHMARK_ITERS env vars for users who want longer or shorter runs.
The operator captured isBarrier in its constructor but always called inputRDD.mapPartitionsInternal, dropping the barrier execution mode semantics that mapInArrow(..., barrier=True) requests. Stages running under the optimization lost gang scheduling and the BarrierTaskContext APIs the UDF expects. Branch on isBarrier and route through inputRDD.barrier().mapPartitions in the barrier case, matching what Spark's MapInBatchExec.doExecute does. Add a pytest case that calls BarrierTaskContext.get() inside the UDF, which raises if the task is not running in a barrier stage; runs in both vanilla and optimized modes. Drop the isBarrier limitation note from the user guide.
|
I assume not only speed is improved, would also be interesting to check memory metrics |
parthchandra
left a comment
There was a problem hiding this comment.
Some minor comments. Otherwise great PR!
| * Optimized flow: CometNativeExec (Arrow) -> CometPythonMapInArrowExec (batch.rowIterator() -> | ||
| * Arrow -> Python -> Arrow columnar output) | ||
| * | ||
| * This eliminates: |
| child: SparkPlan, | ||
| isBarrier: Boolean, | ||
| pythonEvalType: Int) | ||
| extends UnaryExecNode |
There was a problem hiding this comment.
Other Comet operators extend CometPlan. The idea was that all Comet specific common behavior (say some Comet specific metrics, for example) can be in a single place.
|
|
||
| jobs: | ||
| pyarrow-udf: | ||
| name: PyArrow UDF (Spark 3.5, JDK 17, Python 3.11) |
There was a problem hiding this comment.
Should this be Spark 4 now?
I'm assuming that this is enabled for only one version of Spark because it is experimental?
There was a problem hiding this comment.
With the latest changes, this feature is now only supported for Spark 4.x. If there is user demand, we can create a separate PR to support for 3.x
| .booleanConf | ||
| .createWithDefault(false) | ||
|
|
||
| val COMET_PYTHON_MAP_IN_ARROW_ENABLED: ConfigEntry[Boolean] = |
There was a problem hiding this comment.
MAP is confusing IMO, so many things related to map in Spark
There was a problem hiding this comment.
maybe COMET_PYARROW_SUPPORT or something like that?
There was a problem hiding this comment.
Thanks, updated docs to clarify.
…ddress [skip ci] Adds a helper class in comet-common that copies Arrow buffer bytes from a CometDecodedVector to caller-supplied memory addresses without exposing shaded Arrow types across the module boundary. Uses getFieldBuffers()/getChildrenFromFields() traversal consistent with VectorUnloader so buffer ordering matches between source (shaded) and destination (unshaded) sides.
…[skip ci] Rewrites CometColumnarPythonInput to copy Comet vector bytes via CometVectorIpcCopier (long-address API) rather than casting shaded FieldVector to unshaded FieldVector, which caused ClassCastException at runtime. Additional fixes for correct Arrow IPC semantics: - Fill struct validity buffer with 0xFF so Python sees non-null struct rows - Set lastSet before setValueCount on variable-width and list vectors to prevent fillHoles from overwriting correctly copied offset buffers - Process nodes bottom-up so parent setValueCount cascade does not clobber children that have not yet had lastSet updated
|
@comphead @parthchandra @mbutrovich @wForget this is ready for another round of reviews whenever you have time |
The PR description is updated with the new scope |
# Conflicts: # spark/pom.xml # spark/src/main/java/org/apache/comet/vector/CometVectorIpcCopier.java
The long[]-address indirection through CometVectorIpcCopier existed because comet-common shaded org.apache.arrow.* into org.apache.comet.shaded.arrow.*, making source vectors and Spark's IPC root different JVM types. After apache#4325 moved most JVM code into comet-spark and dropped the shading, both sides see the same Arrow classes — the helper is no longer needed. Replace with a direct walk of the source/destination FieldVector trees using ArrowBuf.setBytes for the buffer copy. Same per-buffer memcpy semantics; the cross-RootAllocator constraint that blocks true zero-copy is independent of shading and still tracked in apache#4294.
Adds pytest cases for the data-type branches in CometColumnarPythonInput that were previously unexercised: numeric scalars (boolean/byte/short/float), binary, timestamp NTZ, map, and a deeply nested array/struct combination. Falls back to vanilla Spark when spark.sql.execution.arrow.useLargeVarTypes is enabled. With that conf on, Spark widens StringType/BinaryType to 8-byte-offset variants in the destination IPC root while Comet's source vectors keep 4-byte offsets, so the per-buffer memcpy in copyVector would corrupt the offset buffer. While narrowing the rule to gate on largeVarTypes, also fix a pre-existing greedy match: the MapInBatch case used `p: SparkPlan` with the pyarrow conf as a guard, which matched every plan when the conf was on and consumed the later CometShuffleExchangeExec arm. The case now gates on a structural check via eligibleMapInBatchInfo so unrelated plans flow through.
mbutrovich
left a comment
There was a problem hiding this comment.
Thanks for continuing to refine this, @andygrove! I know this is an experimental feature, but I've got a few more changes and clarifications before we merge this:
Allocator framing
The PR description states:
Comet's Parquet readers each construct their own
RootAllocator, separate fromArrowUtils.rootAllocator, so Arrow'sTransferPaircannot share buffers across the boundary.
The in-code header on CometColumnarPythonInput (lines 1335-1340 of the diff) says the source and destination "live in different RootAllocator trees", and pyarrow-udfs.md (lines 376-379) blames "Comet's Parquet readers allocating from ArrowUtils.rootAllocator (rather than each reader constructing its own independent RootAllocator)".
The actual situation in main looks different from all three:
spark/src/main/scala/org/apache/comet/package.scala:36declares oneRootAllocatorfor the whole process. The doc on lines 29-35 says "we use a single allocator for the whole execution process".grep -rn 'new RootAllocator'over the repo (excluding tests/target) returns only that one line.- For native scan, the Parquet reader is on the Rust side.
native/core/src/parquet/mod.rs:310-311produces an arrow-rsRecordBatchand callsmove_to_spark, which writesFFI_ArrowArray::new(self)into the caller-provided struct (native/core/src/execution/utils.rs:91). The buffers are Rust-allocated and refcounted via Arrow C Data Interface release callbacks. - JVM side imports those via
ArrowImporter.importVector(NativeUtil.scala:264).CometArrowAllocatoris passed to the importer for tracking, but the underlying data buffers are not allocated against it in the Arrow Java sense.
So:
- There is no per-reader JVM
RootAllocatorto point at as the blocker. - The thing that blocks
TransferPairon imported vectors is not allocator separation, it is that imported buffers carry a differentReferenceManagerwhosereleaseroutes through FFI. - The destination root in
PythonArrowInputis a child ofArrowUtils.rootAllocator, which is genuinely a different JVM root, but that asymmetry is downstream of "the source isn't in a JVM allocator tree at all" rather than the framing in the PR.
Could the description and the two doc blocks be rewritten so the follow-up issue (#4294) is grounded in what the code actually does?
PythonArrowInput.allocator visibility differs across Spark versions
Verified by reading sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonArrowInput.scala at each tag in apache/spark:
- 3.4.x / 3.5.x:
allocatoris a local var insidewriteIteratorToArrowStream, not a field. Different trait shape. - 4.0.0 / 4.0.1:
private val allocator = ArrowUtils.rootAllocator.newChildAllocator(...). Not overridable. - 4.1.1 / 4.1.2:
protected val allocator = .... Overridable. - 4.2.0-preview5:
protected lazy val allocator: BufferAllocator = .... Overridable.
This partly defends the per-buffer copy: on 4.0 the visibility blocks any subclass from sharing a parent allocator with the destination root. On 4.1+ it does not.
Open questions:
- Was the design fixed against 4.0 first, with 4.1/4.2 carried along on the same path for symmetry?
- Is a version-conditional fast path on 4.1+ in scope, or a follow-up?
"Next-best alternative to zero-copy" framing
Across the JVM/Python boundary, Spark's transport is fork plus pipe plus Arrow IPC. The buffer bytes have to be written to a pipe at least once, so true zero-copy isn't on the table without a transport-level change in PySpark. The accurate floor is one copy per batch, the IPC serialize step in MessageSerializer.serialize.
The current path does it twice:
- Comet vector to Spark IPC root vector via
copyVector(diff line 1393-1395). - Spark IPC root to pipe via
VectorUnloaderthenMessageSerializer.serialize(diff line 1405-1410).
A path built on Utils.serializeBatches (spark/src/main/scala/org/apache/spark/sql/comet/util/Utils.scala:209-233) does step 2 directly off Comet's vectors and skips step 1. That utility already builds a VectorSchemaRoot from existing Comet FieldVectors via getBatchFieldVectors (lines 360-388). The Python worker reads the resulting bytes via ArrowStreamReader either way.
Because the source buffers on the native scan path are foreign-imported (see allocator section above), the IPC writer reads bytes through the ArrowBuf memory addresses without any JVM allocator involvement, which removes the apparent allocator asymmetry from the picture entirely.
Is "two copies, dropped to one" a more accurate framing than "next-best to zero-copy"?
allocateNew per batch
copyVector (diff line 1429-1438) calls dst.allocateNew(...) on every destination child every batch. BaseFixedWidthVector.allocateNew(int) in Arrow Java calls clear() first and re-allocates buffers. Spark's vanilla BasicPythonArrowInput reuses the persistent root and lets ArrowWriter.reset() keep buffers attached.
- Has this been measured at high batch rates?
- Would
setValueCount(0)plus a grow-on-demand check on existing buffer capacity buy back the persistent-root behavior?
Per-batch new VectorUnloader
Diff line 1405-1406 constructs new VectorUnloader(root, true, cometCodec, true) per batch. Spark's PythonArrowInput.scala:96 constructs the unloader once as a field and reuses it. Is the per-batch construction load-bearing or hoistable?
cometCodec reimplementation
CometColumnarPythonInput resolves the Arrow compression codec via SQLConf.getConfString("spark.sql.execution.arrow.compression.codec", ...) (diff line 1350-1364) and reimplements the none / lz4 / zstd switch.
Spark's PythonArrowInput.scala:80-95 does the same resolution. The diff comment says the reimplementation is needed because 4.0 "lacks SQLConf.arrowCompressionCodec". SQLConf.arrowCompressionCodec appears in 4.0.0, so it'd be worth double-checking whether the actual blocker was the Lz4CompressionCodec / ZstdCompressionCodec import path from arrow-compression. If so, a ShimSQLConf accessor would let the rest collapse to Spark's idiom.
Three near-duplicate CometArrowPythonRunner files
spark/src/main/spark-{4.0,4.1,4.2}/.../CometArrowPythonRunner.scala, sizes 72/64/63 lines.
- 4.0 is structurally different (extends
BasePythonRunnerdirectly because 4.0'sBaseArrowPythonRunneris bound toIterator[InternalRow]). - 4.1 and 4.2 both extend
BaseArrowPythonRunner[Iterator[ColumnarBatch], ColumnarBatch], differing inrunnerConfoverride (4.2 dropped theworkerConfconstructor arg) andwriteUDFsarity.
Would a small CometArrowPythonRunnerBase in spark-4.x/ covering 4.1 and 4.2 read more like the existing Spark4xMapInBatchSupport pattern?
EliminateRedundantTransitions arm
Diff line 450-461. The pattern guard calls eligibleMapInBatchInfo(p).isDefined and the body calls eligibleMapInBatchInfo(p).get, so the matchers and the conf reads run twice. The other arms in this rule are 1-3 line plain pattern matches. An extractor object with unapply returning Option[(MapInBatchInfo, SparkPlan)] would collapse this to a single arm.
Separately, getConfString("spark.sql.execution.arrow.useLargeVarTypes", "false") at diff line ~496 reads stringly. The repo already has ShimSQLConf per Spark version, where an arrowUseLargeVarTypes(conf): Boolean accessor would fit.
Output / input class compatibility
CometMapInBatchExec.processPartition (diff line 618-621) emits a ColumnarBatch whose columns are plain ArrowColumnVector instances pulled from the struct vector returned by the Python IPC reader. Not CometVector wrappers.
Two places downstream that require CometVector:
CometColumnarPythonInput.writeNextBatchToArrowStream(diff line 1386-1391) on the input side of the same operator class:val src = cometBatch.column(i).asInstanceOf[CometDecodedVector].getValueVector.asInstanceOf[FieldVector]
NativeUtil.exportBatch(spark/src/main/scala/org/apache/comet/vector/NativeUtil.scala:117-169), the JVM-to-Rust FFI handoff used by every native Comet operator. Throws on anything that is notCometSelectionVectororCometVector.
Filters and projections that only reference UDF inputs get pushed past the UDF, so they don't reach this position. Plan shapes that do put ArrowColumnVector into one of those two consumers:
- Two sequential pyarrow UDFs.
df.mapInArrow(udf1, s1).mapInArrow(udf2, s2). Spark does not fuse consecutivemapInArrow/mapInPandascalls, each is its ownMapInArrowExec. With this PR, both rewrite toCometMapInBatchExec, and the second one'sasInstanceOf[CometDecodedVector]on the first input column throwsClassCastExceptionon the first batch. - A filter referencing a UDF output column (
mapInArrow(udf, schema).filter("output_col > 0")). Nothing to push past. - A partial aggregate fed by the UDF output.
CometHashAggregateon UDF output would invokeexportBatchand hit the third branch. - The UDF as the probe side of a join with a Comet build side.
Questions:
- Is the sequential-UDF case caught by a planner guard, or does it reach runtime as
ClassCastException? If the latter, this looks like a correctness bug, not a perf concern. - For the other three shapes, does
exportBatchthrow, or is there a wrap step that re-importsArrowColumnVectorintoCometVector? If a wrap exists, what does it cost (rewrap of the sameFieldVectoris cheap, full copy is not). - None of these arrangements appear in
test_pyarrow_udf.py. SequentialmapInArrow.mapInArrow, filter-on-UDF-output, andmapInArrow.groupBy.aggwould pin the contract.
Spark 3.4 / 3.5 stub shims
The two ShimCometMapInBatch.scala stubs under spark-3.4/ and spark-3.5/ are byte-identical. spark/src/main/spark-3.x/org/apache/spark/sql/comet/shims/ already exists and houses several shared 3.x shims (ShimDataSourceRDDPartition.scala, ShimCometShuffleWriteProcessor.scala, ShimCometBatchScanExec.scala). The stub fits there and would drop one of the two copies.
Tests
The pytest module covers happy paths well. Compared with CometCodegenFuzzSuite (introduced in #4267), the gaps that look most relevant given this PR's mechanism (recursive vector-tree memcpy, validity-bit handling, per-batch buffer reallocation):
- No fuzz harness.
CometCodegenFuzzSuitegenerates random schemas and runs identity ScalaUDF over every column. The same harness translates: random schema, write parquet,mapInArrow(passthrough), assert row-equivalence. The bulk-copy incopyVectoris exactly the kind of "walk a vector tree and memcpy buffers" code where structured fuzz catches what hand-written cases miss. - Decimal coverage is narrow. Only
DecimalType(18, 6)is tested.BaseFixedWidthVectorhandles short decimals (8 bytes, long-backed) and long decimals (16-byteFixedSizeBinary) on different paths. TheMAX_LONG_DIGITS=18boundary is where bugs hide. Sweeping precision over{1, 9, 17, 18, 19, 28, 38}and scale over{0, half, max}would close it. - Null density. Validity-buffer memcpy is historically where Arrow Java vector copies break. None of the cases cover all-null batches or single-non-null-in-batch. Fuzz over null fraction in
{0.0, 0.01, 0.5, 0.99, 1.0}would close this. - Multi-batch in one partition. Every test fits in a single Arrow IPC batch (max ~100 rows). The IPC root is reused across batches with
allocateNew/setValueCount/setByteseach call, and variable-width data-buffer growth is a separate path. Forcingbatch_size << row_countand asserting correctness across N batches would cover that. - Transforming UDFs for complex types.
array_and_struct,deeply_nested,map_type,binary_typeall use Python-side passthrough. A symmetric encode/decode mistake (wrong offset arithmetic that the inverse undoes) hides there. One mutating UDF per complex type (reverse(array), swap struct fields, drop one map entry) would surface that class of bug. - Wide schemas. Benchmark tests 50 cols but no correctness test does. The bulk-copy walks an
addresses[]array indexed across the whole tree. Off-by-one in flattening logic surfaces at depth times width. A 50-col mixed-type correctness case would close that. - Empty batches mid-stream.
test_map_in_arrow_empty_inputfilters everything at source so the operator sees zero batches. The non-empty stream containing a 0-row batch (e.g. from a selective filter that keeps later rows) is the more interesting case and isn't currently exercised.
Borrowing the CometCodegenFuzzSuite harness would close 1, 3, and 4 in one move. 2, 5, 6, 7 are addable as targeted hand-written cases if the harness is too heavy for this PR.
…nsumers work Wrap each ArrowColumnVector from the Python output as a CometVector via CometVector.getVector before emitting the batch. This makes the operator a proper Comet columnar producer: - a CometMapInBatchExec stacked above another one can cast the input column to CometDecodedVector (as CometColumnarPythonInput already does) - NativeUtil.exportBatch's case match handles the output (CometVector arm, not the SparkException-throwing fallthrough), so a Comet native aggregate or join probe over the UDF output does not blow up at FFI handoff Adds pytest cases that exercise the consumer shapes (chained mapInArrow, filter on UDF output, groupBy/agg on UDF output) plus a Scala plan-level test pinning the chained-rewrite structure. Addresses mbutrovich's correctness comment on apache#4234.
- Move the 3.4 / 3.5 ShimCometMapInBatch stubs into a single spark-3.x shim (they were byte-identical). The matchers still return None on both versions so the rule is a no-op on Spark 3.x. - Replace the eligibleMapInBatchInfo guard + .get unpack with an EligibleMapInBatch extractor that runs the matchers and conf reads once per visited plan. - Add arrowUseLargeVarTypes(conf) to ShimSQLConf so the rule no longer reads the conf stringly. 4.x and 3.5 forward to the typed accessor; 3.4 falls back to getConfString because that version has no accessor. - Hoist the per-batch VectorUnloader in CometColumnarPythonInput to a lazy val. getRecordBatch reads root.getFieldVectors on every call so reuse is safe; this drops one allocation per batch. - Clarify the comment on cometCodec: 4.0.x has no SQLConf.arrowCompressionCodec accessor (added after 4.0 branch was cut), so a typed ShimSQLConf forwarder would still need a stringly fallback for the 4.0 build. The 4.1+ codec instances live in the separate arrow-compression artifact, which Comet does not depend on; the CompressionCodec.Factory path keeps that dependency contained. Addresses mbutrovich's items 5, 6, 8, 10 on apache#4234.
The PR description, CometColumnarPythonInput header, and pyarrow-udfs.md all blamed the per-buffer copy on 'Comet's Parquet readers each constructing their own RootAllocator'. The repo only has one process-wide RootAllocator (CometArrowAllocator), and native scan does Parquet reading on the Rust side: arrow buffers cross the boundary via Arrow C Data Interface, not a JVM allocator. The actual blocker on TransferPair is that imported buffers carry a ReferenceManager whose release routes through FFI, while Spark's destination IPC root is a child of ArrowUtils.rootAllocator. The two reference managers cannot share buffers. Reframe the per-batch work as 'two copies, one structural': - copy 1 (Comet -> destination IPC root) is droppable, tracked in apache#4294 - copy 2 (root -> pipe via VectorUnloader / MessageSerializer) is the structural floor; Spark's transport to Python is fork + pipe + Arrow IPC, so the bytes must reach the pipe at least once Addresses mbutrovich's items 1 and 3 (framing) on apache#4234. The PR description update is a separate step.
Hand-written cases that pin the boundaries mbutrovich called out as gaps: - decimal precision sweep (1, 9, 17, 18, 19, 28, 38; scale 0/half/max) covering the short-decimal (long-backed) and long-decimal (16-byte FixedSizeBinary) paths and the 18/19 boundary - null density sweep (0, 0.01, 0.5, 0.99, 1.0) for validity-buffer memcpy - multi-batch per partition (batch size 16, 4000 rows in 1 partition) so the persistent destination IPC root is exercised across many batches - wide schema (50 cols, mixed primitives + strings + booleans) for the flattened-tree address arithmetic - mid-stream zero-row batch so setValueCount(0) + validity sizing is hit while the iterator continues - transforming array UDF (reverse each list) to catch symmetric encode/decode mistakes that a passthrough would invert A randomised fuzz harness (analogous to CometCodegenFuzzSuite) is the right next step for the recursive vector-tree walk; deferred to a separate follow-on.
CometSparkSessionExtensions.isCometLoaded short-circuits the whole extension (returning false; no rules registered) when spark.comet.exec.shuffle.enabled is true but spark.shuffle.manager is not Comet's manager. The pytest conftest only sets the basic Comet configs, so this guard fired and CometScanRule never ran. The plan stayed vanilla Parquet, the rewrite chain never had a Comet columnar producer to match, and every [accelerated] assertion that checks for CometMapInBatch failed. These tests do not exercise shuffle, so disable Comet shuffle in the session. Comet's scan and exec rules then run normally and the rewrite fires. Diagnoses the wholesale PyArrow UDF Spark 4.0 CI failure on apache#4234.
|
Item 7 (CometArrowPythonRunner dedupe across 4.1/4.2). Took a closer look. The shared surface ends up smaller than the diff suggests because Spark's
A shared Item 11 (fuzz harness). Borrowing the For this round I've added the targeted gaps in commit a520321:
Also filed #4383 for item 3 (drop the per-batch Comet→Spark buffer copy via direct |
Which issue does this PR close?
Closes #957
Closes #4240
Rationale for this change
Spark's
mapInArrow/mapInPandasplans insertColumnarToRowbetween a Comet scan and the Python operator, andMapInBatchExecthen projects the Python output back to rows. Both projections copy data per row even though the upstream Comet plan and the Python runner both speak Arrow. Inside the Python runner,ArrowPythonRunnerthen re-encodes those rows into Arrow IPC, so the data round-trips Arrow → Row → Arrow on the JVM side before ever reaching Python.This PR replaces the row-based plumbing with a custom
CometArrowPythonRunnerthat consumesIterator[ColumnarBatch]directly. The destinationVectorSchemaRoot(in Spark's allocator, used for the IPC stream) is populated per batch by a singleUnsafe.copyMemoryper buffer per column, copying straight from Comet's vectors into the runner's root. No row materialization, noArrowWriter.write(InternalRow)loop.Not zero-copy in the strict sense: Comet's Parquet readers each construct their own
RootAllocator, separate fromArrowUtils.rootAllocator, so Arrow'sTransferPaircannot share buffers across the boundary. Bulk per-buffer memcpy is the next-best alternative and is materially faster than the per-row write loop, especially on wide rows where the row path is dominated by theInternalRowaccessor overhead. A future PR that unifies the allocator parent would unlock true zero-copy viaTransferPair.I/O asymmetry: why memcpy on input but not on output
The input path memcpys; the output path does not, and that is structural rather than an oversight.
ColumnarBatch, allocated from each Parquet reader's privateRootAllocator. The destination is Spark's persistent IPC root, allocated fromArrowUtils.rootAllocator. Because the two roots are unrelated, Arrow'sTransferPair/VectorLoader.loadcannot rebind buffers across the boundary.CometColumnarPythonInput.copyVectorwalks the trees in lockstep andsetBytes-copies each buffer. This is the bridge between the two allocator trees.BasicPythonArrowOutputconstructs anArrowStreamReaderagainst a per-iteratorBufferAllocator;loadNextBatchdecodes the IPC frames coming back from the Python worker directly into aVectorSchemaRootallocated from that allocator. EachFieldVectorin the root is wrapped in anArrowColumnVectorand emitted as aColumnarBatch.CometMapInBatchExecconsumes those vectors as-is - it just unwraps the single struct child and forwards them. Nothing else in the JVM owns those buffers, so there is no boundary to cross.In short, the input has two pre-existing trees in different allocators (memcpy required); the output has one tree, decoded straight into the right allocator (no copy possible or needed). The future allocator-unification work that would unlock
TransferPairon input does not apply to output - the output is already as good as it can get.Plan (Spark 4.0)
Baseline (
spark.comet.exec.pyarrowUdf.enabled=false):Optimized (
spark.comet.exec.pyarrowUdf.enabled=true):What changes are included in this PR?
CometColumnarPythonInputunderspark/src/main/spark-4.x/.../execution/python/. Extends Spark'sprivate[python] PythonArrowInput[Iterator[ColumnarBatch]]; implementswriteNextBatchToArrowStreamby walking the destination struct's children, allocating each sized to match the corresponding Comet column, collecting buffer addresses, and issuing one bulk copy for the whole tree.CometArrowPythonRunnerper Spark minor (4.0, 4.1, 4.2) under the same package. 4.0 extendsBasePythonRunnerdirectly because Spark 4.0'sBaseArrowPythonRunneris bound toIterator[InternalRow]; 4.1/4.2 extend the genericBaseArrowPythonRunner[IN, OUT]. All three mix inCometColumnarPythonInputplus Spark'sBasicPythonArrowOutput.CometVectorIpcCopierin comet-common crosses the shading boundary using onlylongprimitives. comet-common shadesorg.apache.arrow.*intoorg.apache.comet.shaded.arrow.*; comet-spark references the unshaded Spark Arrow classes. The helper does the byte-level walk over the source vector tree inside common, exposingbufferReadableBytes(): long[],valueCounts(): int[], andcopyBuffersToAddresses(addresses: long[]): Unit. No shaded type crosses the API.CometMapInBatchExecloses all row plumbing (rowIterator,BatchIterator,ContextAwareIterator,InternalRow(_)wrap). FeedsIterator[ColumnarBatch]straight to the runner; on output, unwraps the single struct column into the user's output columns as before.isBarrieris propagated throughRDD.barrier()somapInArrow(..., barrier=True)keeps its gang-scheduling semantics.EliminateRedundantTransitionsrewrite unchanged in shape: matchesColumnarToRow + (PythonMapInArrowExec | MapInPandasExec | MapInArrowExec)over a columnar Comet child.ArrowPythonRunnerconstructors and the 4.0+MapInArrowExecrename. Shared 4.x bits live inspark-4.x/.../Spark4xMapInBatchSupport.scala; per-minor shims provide only the runner factory.PythonArrowInputtrait has a different contract (writeIteratorToArrowStreamone-shot vs 4.x'swriteNextBatchToArrowStreambatch-at-a-time) and a separate implementation has not been written. The matchers in the 3.4 / 3.5 shims returnNone; vanilla Spark handles the operation. 3.5 support can be added later if there is user demand.spark.comet.exec.pyarrowUdf.enabled, defaultfalsewhile experimental.Limitations
mapInArrowandmapInPandas. Scalar pandas UDFs (@pandas_udf) and grouped operations (applyInPandas) are not yet supported.Exchangeoutputs rows and breaks the precondition.RootAllocator, so cross-rootTransferPaircannot be used. A future PR that has the readers allocate fromArrowUtils.rootAllocatorwould unlock zero-copy.How are these changes tested?
CometMapInBatchSuiteunderspark/src/test/spark-4.x/covers the JVM-side rule and an end-to-end check: constructs aMapInArrowExecover a stubCometPlanleaf and assertsEliminateRedundantTransitionsrewrites it toCometMapInBatchExec; runs a Parquet →mapInArrowquery with primitives + nullable varchar and asserts row-equivalence with the un-rewritten output.test_pyarrow_udf.pycoversmapInArrowandmapInPandasend-to-end against a real Python worker: nulls, empty input, Python exception propagation, decimal / date / timestamp / array / struct types, post-shuffle correctness, andbarrier=Truegang scheduling. 24 / 24 cases pass locally on Spark 4.0.pyarrow_udf_test.ymlworkflow runs the pytest module on every PR against Spark 4.0 (covers the 4.x shim path;pathsallowlist scoped to the feature files).Wall-clock benchmark (
benchmark_pyarrow_udf.py, 1M rows, 5 iters, local[2], Spark 4.0.2 / PySpark 4.0.1):The benchmark UDF is a pure passthrough on
local[2], so most of the wall time is Spark's Python fork / IPC overhead. Real UDFs (PyArrow compute, pandas ops, model inference) increase the per-row Python cost and shrink the speedup ratio. The bulk-copy path improves the wide-row case the most because that's where the row path's per-InternalRowoverhead is most concentrated.